{
 "cells": [
  {
   "cell_type": "code",
   "execution_count": 1,
   "metadata": {},
   "outputs": [],
   "source": [
    "import pyspark,ogr,datetime"
   ]
  },
  {
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    "### 读取中国每个省"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": 2,
   "metadata": {},
   "outputs": [],
   "source": [
    "driver = ogr.GetDriverByName('ESRI Shapefile')\n",
    "dataSource = driver.Open(\"./data/cn.shp\", 0)\n",
    "layer = dataSource.GetLayerByIndex(0)"
   ]
  },
  {
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    "## 构造一个用来计算的几何结构，注意这里用wkt，是因为Python无法序列化GDAL的对象为一个列表"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": 3,
   "metadata": {},
   "outputs": [],
   "source": [
    "featArr = []\n",
    "layer.ResetReading()\n",
    "for f in layer:\n",
    "    featArr.append((f.GetField(\"FIRST_NAME\"),\n",
    "                    f.geometry().ExportToIsoWkt()))"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": 4,
   "metadata": {},
   "outputs": [
    {
     "name": "stdout",
     "output_type": "stream",
     "text": [
      "北京 POLYGON ((117.201635735 40.077285175,117.188380491 40.0635675290001,117.172605018 40.047241738,117.17614785 40.059573125,117.176306955 40.0598749100001,117.177471185 40.0619996300001,117.182759255 40.0684182210001,117.17511575 40.0703352350001,117.091653325 40.075145455,117.079871615 40.075415475000\n"
     ]
    }
   ],
   "source": [
    "print(featArr[0][0],featArr[0][1][:300])"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": 5,
   "metadata": {},
   "outputs": [],
   "source": [
    "sc = pyspark.SparkContext()"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": 6,
   "metadata": {},
   "outputs": [],
   "source": [
    "def isPoint(line):\n",
    "    pntline = line.split(\",\")\n",
    "    try:\n",
    "        wkt = \"POINT({0} {1})\".format(float(pntline[2]),\n",
    "                                      float(pntline[3]))\n",
    "        geom = ogr.CreateGeometryFromWkt(wkt)\n",
    "        return geom.IsValid()\n",
    "    except:\n",
    "        return False"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": 7,
   "metadata": {},
   "outputs": [],
   "source": [
    "rdd = sc.textFile(\"./data/eq2013.csv\"\n",
    "                 ).filter(lambda line : isPoint(line))"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": 8,
   "metadata": {},
   "outputs": [
    {
     "data": {
      "text/plain": [
       "['2013/5/31,23:41:56,-113.408,37.175,6.6,2.5,ML2.5,SLC,,UTAH,',\n",
       " '2013/5/31,23:09:05,-113.411,37.178,6,2.5,ML2.5,SLC,,UTAH,',\n",
       " '2013/5/31,22:45:34,-113.413,37.172,4,2.9,ML2.9,SLC,,UTAH,',\n",
       " '2013/5/31,22:34:26,-113.414,37.174,3.2,2.8,ML2.8,SLC,,UTAH,',\n",
       " '2013/5/31,22:34:02,-178.08,51.127,26,3.1,ML3.1,AEIC,,ANDREANOF ISLANDS, ALEUTIAN IS.']"
      ]
     },
     "execution_count": 8,
     "metadata": {},
     "output_type": "execute_result"
    }
   ],
   "source": [
    "rdd.take(5)"
   ]
  },
  {
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    "### map方法，构造一个结构：\n",
    "### 如果这个省包含了点，则为(省名，数量1)\n",
    "### 不在中国任何一个省里面，则为(other，数量1)"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": 9,
   "metadata": {},
   "outputs": [],
   "source": [
    "def myMap(pnt,featArr):\n",
    "    for feat in featArr:\n",
    "        key = feat[0]\n",
    "        geo = ogr.CreateGeometryFromWkt(feat[1])\n",
    "        pntline = pnt.split(\",\")\n",
    "        wkt = \"POINT({0} {1})\".format(float(pntline[2]),\n",
    "                                      float(pntline[3]))\n",
    "        pntGeom = ogr.CreateGeometryFromWkt(wkt)\n",
    "        if geo.Contains(pntGeom):\n",
    "            return (key,1)\n",
    "    return (\"other\",1)"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": 10,
   "metadata": {},
   "outputs": [],
   "source": [
    "maprdd = rdd.map(lambda line: myMap(line,featArr))"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": 11,
   "metadata": {},
   "outputs": [
    {
     "name": "stdout",
     "output_type": "stream",
     "text": [
      "[('other', 1), ('other', 1), ('other', 1), ('other', 1), ('other', 1), ('other', 1), ('other', 1), ('other', 1), ('other', 1), ('other', 1), ('other', 1), ('other', 1), ('other', 1), ('other', 1), ('other', 1), ('other', 1), ('other', 1), ('other', 1), ('other', 1), ('other', 1), ('other', 1), ('other', 1), ('other', 1), ('other', 1), ('other', 1), ('other', 1), ('other', 1), ('other', 1), ('other', 1), ('西藏', 1), ('other', 1), ('other', 1), ('other', 1), ('other', 1), ('other', 1), ('other', 1), ('other', 1), ('other', 1), ('other', 1), ('other', 1), ('other', 1), ('other', 1), ('other', 1), ('other', 1), ('other', 1), ('other', 1), ('other', 1), ('other', 1), ('other', 1), ('other', 1), ('other', 1), ('other', 1), ('other', 1), ('other', 1), ('other', 1), ('other', 1), ('other', 1), ('other', 1), ('other', 1), ('other', 1), ('other', 1), ('other', 1), ('other', 1), ('other', 1), ('other', 1), ('other', 1), ('other', 1), ('other', 1), ('other', 1), ('other', 1), ('other', 1), ('other', 1), ('other', 1), ('other', 1), ('other', 1), ('other', 1), ('other', 1), ('other', 1), ('other', 1), ('other', 1), ('other', 1), ('other', 1), ('other', 1), ('other', 1), ('other', 1), ('other', 1), ('other', 1), ('other', 1), ('other', 1), ('other', 1), ('other', 1), ('other', 1), ('other', 1), ('other', 1), ('other', 1), ('other', 1), ('other', 1), ('other', 1), ('other', 1), ('other', 1)]\n"
     ]
    }
   ],
   "source": [
    "print(maprdd.take(100))"
   ]
  },
  {
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    "### reduceByKey是分组聚合"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": 12,
   "metadata": {},
   "outputs": [
    {
     "name": "stdout",
     "output_type": "stream",
     "text": [
      "0:00:48.190894\n"
     ]
    }
   ],
   "source": [
    "s = datetime.datetime.now()\n",
    "res = maprdd.reduceByKey(lambda x,y : x+y).collect()\n",
    "print(datetime.datetime.now() -s)"
   ]
  },
  {
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    "### 计算结果"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": 13,
   "metadata": {
    "scrolled": true
   },
   "outputs": [
    {
     "name": "stdout",
     "output_type": "stream",
     "text": [
      "[('other', 8042), ('云南', 10), ('四川', 69), ('青海', 17), ('山东', 1), ('贵州', 1), ('西藏', 23), ('新疆', 20), ('内蒙古', 4), ('台湾', 12), ('广东', 1), ('广西', 1), ('甘肃', 2), ('辽宁', 1)]\n"
     ]
    }
   ],
   "source": [
    "print(res)"
   ]
  },
  {
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    "# 先把不是中国范围内的数据过滤掉"
   ]
  },
  {
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    "### 用gdal获取到中国的extent"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": 16,
   "metadata": {},
   "outputs": [
    {
     "data": {
      "text/plain": [
       "(73.61689383500004, 135.08727119000002, 18.278927775000056, 53.56026110500005)"
      ]
     },
     "execution_count": 16,
     "metadata": {},
     "output_type": "execute_result"
    }
   ],
   "source": [
    "cnext = layer.GetExtent()\n",
    "cnext"
   ]
  },
  {
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    "### 写一个过滤方法，过滤中国范围以外的数据"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": 17,
   "metadata": {},
   "outputs": [],
   "source": [
    "def filterExtent(line,cnext):\n",
    "    pntline = line.split(\",\")\n",
    "    x = float(pntline[2])\n",
    "    y = float(pntline[3])\n",
    "    if x >= cnext[0] and x <= cnext[1] \\\n",
    "    and y >= cnext[2] and y <= cnext[3]:\n",
    "        return True\n",
    "    else:\n",
    "        return False"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": 19,
   "metadata": {},
   "outputs": [],
   "source": [
    "maprdd2 = rdd.filter(lambda line:filterExtent(line,cnext))\\\n",
    ".map(lambda line: myMap(line,featArr))"
   ]
  },
  {
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    "### 速度显著提升"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": 20,
   "metadata": {},
   "outputs": [
    {
     "name": "stdout",
     "output_type": "stream",
     "text": [
      "0:00:06.146958\n"
     ]
    }
   ],
   "source": [
    "s = datetime.datetime.now()\n",
    "res = maprdd.reduceByKey(lambda x,y : x+y).collect()\n",
    "print(datetime.datetime.now() -s)"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": 21,
   "metadata": {},
   "outputs": [
    {
     "name": "stdout",
     "output_type": "stream",
     "text": [
      "[('other', 232), ('云南', 10), ('四川', 69), ('青海', 17), ('山东', 1), ('贵州', 1), ('西藏', 23), ('新疆', 20), ('内蒙古', 4), ('台湾', 12), ('广东', 1), ('广西', 1), ('甘肃', 2), ('辽宁', 1)]\n"
     ]
    }
   ],
   "source": [
    "print(res)"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": null,
   "metadata": {},
   "outputs": [],
   "source": []
  }
 ],
 "metadata": {
  "kernelspec": {
   "display_name": "Python 3",
   "language": "python",
   "name": "python3"
  },
  "language_info": {
   "codemirror_mode": {
    "name": "ipython",
    "version": 3
   },
   "file_extension": ".py",
   "mimetype": "text/x-python",
   "name": "python",
   "nbconvert_exporter": "python",
   "pygments_lexer": "ipython3",
   "version": "3.6.7"
  }
 },
 "nbformat": 4,
 "nbformat_minor": 2
}
